/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.eagle.service.client.impl;

import com.sun.jersey.api.client.WebResource;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceClientException;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class EagleServiceClientImpl extends EagleServiceBaseClient {
    private static final Logger LOG = LoggerFactory.getLogger(EagleServiceClientImpl.class);
    private static final String SERVICE_HOST_KEY = "service.host";
    private static final String SERVICE_PORT_KEY = "service.port";
    private static final String SERVICE_USERNAME_KEY = "service.username";
    private static final String SERVICE_PASSWORD_KEY = "service.password";

    public EagleServiceClientImpl(String host, int port) {
        super(host, port);
    }

    @Deprecated
    public EagleServiceClientImpl(EagleServiceConnector connector) {
        this(connector.getEagleServiceHost(), connector.getEagleServicePort(), connector.getUsername(), connector.getPassword());
    }

    public EagleServiceClientImpl(Config config) {
        super(
            config.hasPath(SERVICE_HOST_KEY) ? config.getString(SERVICE_HOST_KEY) : "localhost",
            tryGetPortFromConfig(config),
            config.hasPath(SERVICE_USERNAME_KEY) ? config.getString(SERVICE_USERNAME_KEY) : null,
            config.hasPath(SERVICE_PASSWORD_KEY) ? config.getString(SERVICE_PASSWORD_KEY) : null
        );
    }

    /**
     * Try to get eagle service port from config by key: service.port no matter STRING or INT.
     */
    private static int tryGetPortFromConfig(Config config) {
        if (config.hasPath(SERVICE_PORT_KEY)) {
            try {
                return config.getInt(SERVICE_PORT_KEY);
            } catch (ConfigException.WrongType wrongType) {
                String portStr = config.getString(SERVICE_PORT_KEY);
                if (StringUtils.isNotBlank(portStr)) {
                    return Integer.valueOf(portStr);
                }
            }
        }
        return 9090;
    }

    public EagleServiceClientImpl(String host, int port, String username, String password) {
        super(host, port, username, password);
    }

    public EagleServiceClientImpl(String host, int port, String basePath, String username, String password) {
        super(host, port, basePath, username, password);
    }

    private String getWholePath(String urlString) {
        return getBaseEndpoint() + urlString;
    }

    @Override
    public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> create(List<E> entities, String serviceName) throws IOException, EagleServiceClientException {
        checkNotNull(serviceName, "serviceName");
        checkNotNull(entities, "entities");

        final GenericServiceAPIResponseEntity<String> response;
        response = postEntitiesWithService(GENERIC_ENTITY_PATH, entities, serviceName);
        if (!response.isSuccess()) {
            LOG.error("Failed to create entities for service: " + serviceName);
        }
        return response;
    }

    @Override
    public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> create(List<E> entities) throws IOException, EagleServiceClientException {
        checkNotNull(entities, "entities");

        Map<String, List<E>> serviceEntityMap = groupEntitiesByService(entities);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating entities for " + serviceEntityMap.keySet().size() + " services");
        }

        List<String> createdKeys = new LinkedList<String>();

        for (Map.Entry<String, List<E>> entry : serviceEntityMap.entrySet()) {
            GenericServiceAPIResponseEntity<String> response = create(entry.getValue(), entry.getKey());
            if (!response.isSuccess()) {
                throw new IOException("Service side exception: " + response.getException());
            } else if (response.getObj() != null) {
                createdKeys.addAll(response.getObj());
            }
        }
        GenericServiceAPIResponseEntity<String> entity = new GenericServiceAPIResponseEntity<String>();
        entity.setObj(createdKeys);
        entity.setSuccess(true);
        return entity;
    }

    @Override
    public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> delete(List<E> entities) throws IOException, EagleServiceClientException {
        checkNotNull(entities, "entities");

        Map<String, List<E>> serviceEntityMap = groupEntitiesByService(entities);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating entities for " + serviceEntityMap.keySet().size() + " services");
        }

        List<String> deletedKeys = new LinkedList<String>();
        for (Map.Entry<String, List<E>> entry : serviceEntityMap.entrySet()) {
            GenericServiceAPIResponseEntity<String> response = delete(entry.getValue(), entry.getKey());
            if (!response.isSuccess()) {
                LOG.error("Got service exception: " + response.getException());
                throw new IOException(response.getException());
            } else if (response.getObj() != null) {
                deletedKeys.addAll(response.getObj());
            }
        }
        GenericServiceAPIResponseEntity<String> entity = new GenericServiceAPIResponseEntity<String>();
        entity.setObj(deletedKeys);
        entity.setSuccess(true);
        return entity;
    }

    @SuppressWarnings("unchecked")
    @Override
    public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> delete(List<E> entities, String serviceName) throws IOException, EagleServiceClientException {
        checkNotNull(entities, "entities");
        checkNotNull(serviceName, "serviceName");

        return postEntitiesWithService(GENERIC_ENTITY_DELETE_PATH, entities, serviceName);
    }

    @SuppressWarnings("unchecked")
    @Override
    public GenericServiceAPIResponseEntity<String> delete(EagleServiceSingleEntityQueryRequest request) throws IOException, EagleServiceClientException {
        String queryString = request.getQueryParameterString();
        StringBuilder sb = new StringBuilder();
        sb.append(GENERIC_ENTITY_PATH);
        sb.append("?");
        sb.append(queryString);
        final String urlString = sb.toString();

        if (!this.silence) {
            LOG.info("Going to delete by querying service: " + getWholePath(urlString));
        }
        WebResource r = getWebResource(urlString);
        return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE)
            .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE))
            .delete(GenericServiceAPIResponseEntity.class);
    }

    @SuppressWarnings("unchecked")
    @Override
    public GenericServiceAPIResponseEntity<String> deleteById(List<String> ids, String serviceName) throws EagleServiceClientException, IOException {
        checkNotNull(serviceName, "serviceName");
        checkNotNull(ids, "ids");

        final String json = marshall(ids);
        final WebResource r = getWebResource(GENERIC_ENTITY_DELETE_PATH);
        return putAuthHeaderIfNeeded(r.queryParam(SERVICE_NAME, serviceName)
            .queryParam(DELETE_BY_ID, "true")
            .accept(DEFAULT_MEDIA_TYPE))
            .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
            .post(GenericServiceAPIResponseEntity.class, json);
    }

    @Override
    public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> update(List<E> entities) throws IOException, EagleServiceClientException {
        checkNotNull(entities, "entities");

        Map<String, List<E>> serviceEntityMap = groupEntitiesByService(entities);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating entities for " + serviceEntityMap.keySet().size() + " services");
        }

        List<String> createdKeys = new LinkedList<String>();

        for (Map.Entry<String, List<E>> entry : serviceEntityMap.entrySet()) {
            GenericServiceAPIResponseEntity<String> response = update(entry.getValue(), entry.getKey());
            if (!response.isSuccess()) {
                throw new IOException("Got service exception when updating service " + entry.getKey() + " : " + response.getException());
            } else {
                if (response.getObj() != null) {
                    createdKeys.addAll(response.getObj());
                }
            }
        }

        GenericServiceAPIResponseEntity<String> entity = new GenericServiceAPIResponseEntity<String>();
        entity.setObj(createdKeys);
        entity.setSuccess(true);
        return entity;
    }

    @Override
    public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> update(List<E> entities, String serviceName) throws IOException, EagleServiceClientException {
        checkNotNull(entities, "entities");
        checkNotNull(serviceName, "serviceName");

        return putEntitiesWithService(GENERIC_ENTITY_PATH, entities, serviceName);
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
        String queryString = request.getQueryParameterString();
        StringBuilder sb = new StringBuilder();
        sb.append(GENERIC_ENTITY_PATH);
        sb.append("?");
        sb.append(queryString);
        final String urlString = sb.toString();
        if (!this.silence) {
            LOG.info("Going to query service: " + getWholePath(urlString));
        }
        WebResource r = getWebResource(urlString);
        return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE))
            .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
            .get(GenericServiceAPIResponseEntity.class);
    }
}